{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Helpdesk Hard"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Loading spark-stubs, spark-hive\n",
      "Adding Hive conf dir /opt/hive/conf to classpath\n",
      "Creating SparkSession\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "SLF4J: No SLF4J providers were found.\n",
      "SLF4J: Defaulting to no-operation (NOP) logger implementation\n",
      "SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a target=\"_blank\" href=\"http://jupyter:4040\">Spark UI</a>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$                                  \n",
       "\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.types._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.functions._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.expressions.Window\n",
       "\n",
       "\u001b[39m\n",
       "\u001b[36mspark\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@7dac6a87"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import $ivy.`org.apache.spark::spark-sql:3.4.0`\n",
    "\n",
    "import org.apache.log4j.{Level, Logger}\n",
    "Logger.getLogger(\"org\").setLevel(Level.OFF)\n",
    "\n",
    "import org.apache.spark._\n",
    "import org.apache.spark.sql._\n",
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.functions._\n",
    "import org.apache.spark.sql.expressions.Window\n",
    "\n",
    "val spark = {\n",
    "    NotebookSparkSession.builder()\n",
    "    .progress(false)\n",
    "    .appName(\"app12-3\")\n",
    "    // .master(\"spark://192.168.31.31:7077\")\n",
    "    .master(\"local[*]\")\n",
    "    .config(\"spark.sql.warehouse.dir\", \n",
    "            \"hdfs://192.168.31.31:9000/user/hive/warehouse\") \n",
    "    .config(\"spark.cores.max\", \"4\") \n",
    "    .config(\"spark.executor.instances\", \"1\") \n",
    "    .config(\"spark.executor.cores\", \"2\") \n",
    "    .config(\"spark.executor.memory\", \"10g\") \n",
    "    .config(\"spark.shuffle.service.enabled\", \"false\") \n",
    "    .config(\"spark.dynamicAllocation.enabled\", \"false\") \n",
    "    .config(\"spark.sql.catalogImplementation\", \"hive\")\n",
    "    .config(\"spark.sql.repl.eagerEval.enabled\", \"true\")\n",
    "    .config(\"spark.driver.allowMultipleContexts\", \"true\")\n",
    "    .getOrCreate()\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36mspark.implicits._\n",
       "\u001b[39m\n",
       "defined \u001b[32mfunction\u001b[39m \u001b[36msc\u001b[39m\n",
       "\u001b[36mhiveCxt\u001b[39m: \u001b[32msql\u001b[39m.\u001b[32mhive\u001b[39m.\u001b[32mHiveContext\u001b[39m = org.apache.spark.sql.hive.HiveContext@343aa440"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import spark.implicits._\n",
    "def sc = spark.sparkContext\n",
    "val hiveCxt = new org.apache.spark.sql.hive.HiveContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined \u001b[32mclass\u001b[39m \u001b[36mRichDF\u001b[39m"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// Credit to Aivean\n",
    "implicit class RichDF(val ds:DataFrame) {\n",
    "    def showHTML(limit: Int = 50, truncate: Int = 100) = {\n",
    "        import xml.Utility.escape\n",
    "        val data = ds.take(limit)\n",
    "        val header = ds.schema.fieldNames.toSeq        \n",
    "        val rows: Seq[Seq[String]] = data.map { row =>\n",
    "          row.toSeq.map {cell =>\n",
    "            val str = cell match {\n",
    "              case null => \"null\"\n",
    "              case binary: Array[Byte] => binary.map(\"%02X\".format(_)).mkString(\"[\", \" \", \"]\")\n",
    "              case array: Array[_] => array.mkString(\"[\", \", \", \"]\")\n",
    "              case seq: Seq[_] => seq.mkString(\"[\", \", \", \"]\")\n",
    "              case _ => cell.toString\n",
    "            }\n",
    "            if (truncate > 0 && str.length > truncate) {\n",
    "              // do not show ellipses for strings shorter than 4 characters.\n",
    "              if (truncate < 4) str.substring(0, truncate)\n",
    "              else str.substring(0, truncate - 3) + \"...\"\n",
    "            } else {\n",
    "              str\n",
    "            }\n",
    "          }: Seq[String]\n",
    "        }\n",
    "    publish.html(s\"\"\" <table>\n",
    "                <tr>\n",
    "                 ${header.map(h => s\"<th>${escape(h)}</th>\").mkString}\n",
    "                </tr>\n",
    "                ${rows.map {row =>\n",
    "                  s\"<tr>${row.map{c => s\"<td>${escape(c)}</td>\" }.mkString}</tr>\"\n",
    "                }.mkString}\n",
    "            </table>\n",
    "        \"\"\")\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[36mshift\u001b[39m: \u001b[32mDataFrame\u001b[39m = [shift_date: string, shift_type: string ... 4 more fields]\n",
       "\u001b[36mstaff\u001b[39m: \u001b[32mDataFrame\u001b[39m = [staff_code: string, first_name: string ... 2 more fields]\n",
       "\u001b[36missue\u001b[39m: \u001b[32mDataFrame\u001b[39m = [call_date: string, call_ref: int ... 5 more fields]\n",
       "\u001b[36mshift_type\u001b[39m: \u001b[32mDataFrame\u001b[39m = [shift_type: string, start_time: string ... 1 more field]\n",
       "\u001b[36mlevel\u001b[39m: \u001b[32mDataFrame\u001b[39m = [level_code: int, manager: string ... 2 more fields]\n",
       "\u001b[36mcustomer\u001b[39m: \u001b[32mDataFrame\u001b[39m = [company_ref: int, company_name: string ... 6 more fields]\n",
       "\u001b[36mcaller\u001b[39m: \u001b[32mDataFrame\u001b[39m = [caller_id: int, company_ref: int ... 2 more fields]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val shift = hiveCxt.table(\"sqlzoo.Shift\")\n",
    "val staff = hiveCxt.table(\"sqlzoo.Staff\")\n",
    "val issue = hiveCxt.table(\"sqlzoo.Issue\")\n",
    "val shift_type = hiveCxt.table(\"sqlzoo.Shift_type\")\n",
    "val level = hiveCxt.table(\"sqlzoo.Level\")\n",
    "val customer = hiveCxt.table(\"sqlzoo.Customer\")\n",
    "val caller = hiveCxt.table(\"sqlzoo.Caller\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 11.\n",
    "Show the manager and number of calls received for each hour of the day on 2017-08-12\n",
    "\n",
    "```\n",
    "+---------+---------------+----+\n",
    "| Manager | Hr            | cc |\n",
    "+---------+---------------+----+\n",
    "| LB1     | 2017-08-12 08 |  6 |\n",
    "| LB1     | 2017-08-12 09 | 16 |\n",
    "| LB1     | 2017-08-12 10 | 11 |\n",
    "| LB1     | 2017-08-12 11 |  6 |\n",
    "| LB1     | 2017-08-12 12 |  8 |\n",
    "| LB1     | 2017-08-12 13 |  4 |\n",
    "| AE1     | 2017-08-12 14 | 12 |\n",
    "| AE1     | 2017-08-12 15 |  8 |\n",
    "| AE1     | 2017-08-12 16 |  8 |\n",
    "| AE1     | 2017-08-12 17 |  7 |\n",
    "| AE1     | 2017-08-12 19 |  5 |\n",
    "+---------+---------------+----+\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>Manager</th><th>hr</th><th>cc</th>\n",
       "                </tr>\n",
       "                <tr><td>LB1</td><td>2017-08-12 08</td><td>6</td></tr><tr><td>LB1</td><td>2017-08-12 09</td><td>16</td></tr><tr><td>LB1</td><td>2017-08-12 10</td><td>11</td></tr><tr><td>LB1</td><td>2017-08-12 11</td><td>6</td></tr><tr><td>LB1</td><td>2017-08-12 12</td><td>8</td></tr><tr><td>LB1</td><td>2017-08-12 13</td><td>4</td></tr><tr><td>AE1</td><td>2017-08-12 14</td><td>12</td></tr><tr><td>AE1</td><td>2017-08-12 15</td><td>8</td></tr><tr><td>AE1</td><td>2017-08-12 16</td><td>8</td></tr><tr><td>AE1</td><td>2017-08-12 17</td><td>7</td></tr><tr><td>AE1</td><td>2017-08-12 19</td><td>5</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(issue.withColumn(\"date_call\", to_date(issue(\"Call_date\")))\n",
    " .withColumn(\"hr\", date_format(issue(\"Call_date\"), \"yyyy-MM-dd HH\"))\n",
    " .filter(col(\"date_call\")===\"2017-08-12\")\n",
    " .join(shift, (col(\"Taken_by\")===col(\"Operator\")) && \n",
    "       (col(\"date_call\")===col(\"Shift_date\")))\n",
    " .groupBy(\"Manager\", \"hr\")\n",
    " .agg(count(\"Call_ref\").alias(\"cc\"))\n",
    " .orderBy(\"hr\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 12.\n",
    "**80/20 rule. It is said that 80% of the calls are generated by 20% of the callers. Is this true? What percentage of calls are generated by the most active 20% of callers.**\n",
    "\n",
    "Note - Andrew has not managed to do this in one query - but he believes it is possible.\n",
    "\n",
    "```\n",
    "+---------+\n",
    "| t20pc   |\n",
    "+---------+\n",
    "| 32.2581 |\n",
    "+---------+\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>t20pc</th>\n",
       "                </tr>\n",
       "                <tr><td>32.2581</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(issue.groupBy(\"Caller_id\")\n",
    " .agg(count(\"Caller_id\").alias(\"n\"))\n",
    " .orderBy(col(\"n\").desc)\n",
    " .limit((issue.select(\"Caller_id\").distinct().count() * 0.2).floor.toInt)\n",
    " .agg(sum(\"n\").alias(\"sum_n\"))\n",
    " .withColumn(\"t20pc\", round(lit(100) * col(\"sum_n\") / issue.count(), 4))\n",
    " .select(\"t20pc\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 13.\n",
    "**Annoying customers. Customers who call in the last five minutes of a shift are annoying. Find the most active customer who has never been annoying.**\n",
    "\n",
    "```\n",
    "+--------------+------+\n",
    "| Company_name | abna |\n",
    "+--------------+------+\n",
    "| High and Co. |   20 |\n",
    "+--------------+------+\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>Company_ref</th><th>Company_name</th><th>abna</th>\n",
       "                </tr>\n",
       "                <tr><td>146</td><td>High and Co.</td><td>20</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[36mannoy\u001b[39m: \u001b[32mDataset\u001b[39m[\u001b[32mRow\u001b[39m] = [Company_ref: int, Shift_date: string ... 4 more fields]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val annoy = (issue.withColumn(\"date_call\", to_date(issue(\"Call_date\")))\n",
    "         .join(shift, (col(\"Taken_by\")===col(\"Operator\")) && \n",
    "                      (col(\"date_call\")===col(\"Shift_date\")))\n",
    "         .join(shift_type, \"Shift_type\")\n",
    "         .join(caller, \"Caller_id\")\n",
    "         .join(customer, \"Company_ref\")\n",
    "         .select(\"Company_ref\", \"Shift_date\", \"End_time\", \"Call_date\")\n",
    "         .withColumn(\"Shift_end\", concat_ws(\n",
    "             \" \", col(\"Shift_date\").cast(\"string\"), col(\"End_time\")))\n",
    "         .withColumn(\"till_shiftend\", \n",
    "                     (to_timestamp(col(\"Shift_end\")) - to_timestamp(col(\"Call_date\")))\n",
    "                     .cast(\"long\"))\n",
    "         .filter(col(\"till_shiftend\") <= 5*60))\n",
    "\n",
    "(issue.join(caller, \"Caller_id\")\n",
    " .join(customer, \"Company_ref\")\n",
    " .join(annoy, \"Company_ref\", joinType=\"left_anti\")\n",
    " .groupBy(\"Company_ref\", \"Company_name\")\n",
    " .agg(count(\"Caller_id\").alias(\"abna\"))\n",
    " .orderBy(col(\"abna\").desc)\n",
    " .limit(1)\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 14.\n",
    "**Maximal usage. If every caller registered with a customer makes a call in one day then that customer has \"maximal usage\" of the service. List the maximal customers for 2017-08-13.**\n",
    "\n",
    "```\n",
    "+-------------------+--------------+-------------+\n",
    "| company_name      | caller_count | issue_count |\n",
    "+-------------------+--------------+-------------+\n",
    "| Askew Inc.        |            2 |           2 |\n",
    "| Bai Services      |            2 |           2 |\n",
    "| Dasher Services   |            3 |           3 |\n",
    "| High and Co.      |            5 |           5 |\n",
    "| Lady Retail       |            4 |           4 |\n",
    "| Packman Shipping  |            3 |           3 |\n",
    "| Pitiable Shipping |            2 |           2 |\n",
    "| Whale Shipping    |            2 |           2 |\n",
    "+-------------------+--------------+-------------+\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>Company_name</th><th>caller_count</th><th>issue_count</th>\n",
       "                </tr>\n",
       "                <tr><td>Askew Inc.</td><td>2</td><td>2</td></tr><tr><td>Bai Services</td><td>2</td><td>2</td></tr><tr><td>Dasher Services</td><td>3</td><td>3</td></tr><tr><td>High and Co.</td><td>5</td><td>5</td></tr><tr><td>Lady Retail</td><td>4</td><td>4</td></tr><tr><td>Packman Shipping</td><td>3</td><td>3</td></tr><tr><td>Pitiable Shipping</td><td>2</td><td>2</td></tr><tr><td>Whale Shipping</td><td>2</td><td>2</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[36miss\u001b[39m: \u001b[32mDataFrame\u001b[39m = [Caller_id: int, Company_ref: int ... 3 more fields]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val iss = (issue.filter((col(\"Call_date\")>=\"2017-08-13\") &&\n",
    "                    (col(\"Call_date\")<\"2017-08-14\"))\n",
    "       .join(caller, \"Caller_id\", joinType=\"right\")\n",
    "       .join(customer, \"Company_ref\", joinType=\"left\")\n",
    "       .groupBy(\"Caller_id\", \"Company_ref\", \"Company_name\")\n",
    "       .agg(count(\"Call_ref\").alias(\"n\"))\n",
    "       .withColumn(\"iss\", (col(\"n\")>0).cast(\"integer\")))\n",
    "\n",
    "(iss.groupBy(\"Company_name\")\n",
    " .agg(count(\"Company_ref\").alias(\"caller_count\"),\n",
    "      sum(\"iss\").alias(\"issue_count\"))\n",
    " .filter(col(\"caller_count\")===col(\"issue_count\"))\n",
    " .orderBy(\"Company_name\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 15.\n",
    "**Consecutive calls occur when an operator deals with two callers within 10 minutes. Find the longest sequence of consecutive calls – give the name of the operator and the first and last call date in the sequence.**\n",
    "\n",
    "```\n",
    "+----------+---------------------+---------------------+-------+\n",
    "| taken_by | first_call          | last_call           | calls |\n",
    "+----------+---------------------+---------------------+-------+\n",
    "| AB1      | 2017-08-14 09:06:00 | 2017-08-14 10:17:00 |    24 |\n",
    "+----------+---------------------+---------------------+-------+\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>Taken_by</th><th>first_call</th><th>last_call</th><th>n_calls</th>\n",
       "                </tr>\n",
       "                <tr><td>AB1</td><td>2017-08-14 09:06:00.0</td><td>2017-08-14 10:17:00.0</td><td>24</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(issue.withColumn(\"lag\", lag(\"Call_date\", 1).over(\n",
    "    Window.partitionBy(\"Taken_by\").orderBy(\"Taken_by\", \"Call_date\")))\n",
    " .withColumn(\"flag\", ((to_timestamp(col(\"Call_date\")) - \n",
    "                       to_timestamp(col(\"lag\"))).cast(\"long\") > 10*60).cast(\"integer\"))\n",
    " .withColumn(\"grp\", sum(\"flag\").over(\n",
    "     Window.partitionBy(\"Taken_by\").orderBy(\"Taken_by\", \"Call_date\")))\n",
    " .groupBy(\"Taken_by\", \"grp\")\n",
    " .agg(min(\"Call_date\").alias(\"first_call\"),\n",
    "      max(\"Call_date\").alias(\"last_call\"),\n",
    "      count(\"Caller_id\").alias(\"n_calls\"))\n",
    " .orderBy(col(\"n_calls\").desc)\n",
    " .select(\"Taken_by\", \"first_call\", \"last_call\", \"n_calls\")\n",
    " .limit(1)\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".sc",
   "mimetype": "text/x-scala",
   "name": "scala",
   "nbconvert_exporter": "script",
   "version": "2.12.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
